---
layout: post
date: 2024-10-15
title: "TCP Server in Zig - Part 5a - Poll "
description: "Using non-blocking sockets and poll to improve the scalability of our system."
tags: [zig]
---

<p>One of the reasons we introduced multithreading was to get around that fact that our <code>read</code> and, to a lesser extent, <code>accept</code> and <code>write</code>, block. In our initial single-threaded implementation, rather than pushing our server to its limits, we spent a lot of time idle, waiting for data to come in. Multithreading helped to unblock the main thread so that new connections could be accepted - as long as we had enough workers to handle them - while processing existing connections. But threads are relatively heavyweight constructs and it isn't particularly efficient to spawn them and then have them blocked waiting for data.</p>

<p>There are two complimentary parts to improving our design: non-blocking I/O and event-notification.</p>

<aside><p>As our code gets more complicated, we'll look at only part of the implementation, such as an individual function. If you want additional context, you can find a <a href=#appendix-a>complete runnable example</a> at the end.</p></aside>

<h3 id=nonblocking><a href="#nonblocking" aria-hidden=true>Non-Blocking I/O</a></h3>
<p>It's possible to put a socket in non-blocking mode. When enabled, functions which normally block, such as <code>accept</code>, <code>read</code> or <code>write</code>, will return <code>error.WouldBlock</code> (or <code>EAGAIN</code> in C) rather than blocking. As we're about to see, it's hard to take advantage of this on its own, but we're looking at it first, to get a feel for it. Consider what happens if we go to one of our earlier single-threaded implementations and enable non-blocking sockets (two lines have been changed, both commented):</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    // ADDED: ` | posix.SOCK.NONBLOCK`
    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    var buf: [128]u8 = undefined;
    while (true) {
        // Replaced: `0` with `posix.SOCK.NONBLOCK`
        const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| {
            std.debug.print("error accept: {}\n", .{err});
            continue;
        };
        defer posix.close(socket);

        const stream = std.net.Stream{.handle = socket};
        const read = try stream.read(&buf);
        if (read == 0) {
            continue;
        }
        try stream.writeAll(buf[0..read]);
    }
}
{% endhighlight %}

<p>When we create our listening socket, using <code>posix.socket</code>, we now set the <code>SOCK.NONBLOCK</code> flag. Similarly, when we <code>accept</code> we now pass that same flag, <code>SOCK.NONBLOCK</code>, as our fourth parameter. The first usage puts our listening socket in non-blocking mode (we'll see what that accomplishes shortly). The second usage puts a newly connected socket in non-blocking mode. This second usage is a special-case. There are actually two <code>accept</code> system calls: <code>accept</code> and <code>accept4</code>. The first one, <code>accept</code> only takes 3 parameters whereas <code>accept4</code> takes a 4th parameter. That 4th parameter is for any flags, like <code>SOCK.NONBLOCK</code>, we want to apply to the connected socket. If Zig's <code>posix.accept</code> detects that <code>accept4</code> is available, it uses it, otherwise it calls <code>accept</code> and then calls <code>fcntl</code> to set the appropriate flags. <code>accept4</code> does what Part 3 talked about (minimizing system calls) by combining <code>accept</code> with <code>fcntl</code>.</p>

<aside><p><code>fcntl</code> is similar to the <code>setsocketopt</code> we've already seen, but isn't socket-specific. The NONBLOCKING flag isn't socket-specific, it can be applied to other types of file descriptors (like filesystem files), so we use <code>fcntl</code> to set those more generic flags on our socket.</p></aside>

<p>If you try to run the above code, you should get a rapid and endless streams of:</p>

{% highlight text %}
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
error accept: error.WouldBlock
{% endhighlight %}

<p>We said that <code>SOCK.NONBLOCK</code> made it so functions like <code>accept</code> return an error instead of blocking, given that our <code>accept</code> looks like:</p>

{% highlight zig %}
while (true) {
    const socket = posix.accept(listener, &client_address.any, &client_address_len, posix.SOCK.NONBLOCK) catch |err| {
        std.debug.print("error accept: {}\n", .{err});
        continue;
    };
    // ...
}
{% endhighlight %}

<p>The output makes sense. There's no obvious solution either. We could sleep whenever <code>accept</code> returns <code>error.WouldBlock</code>, but that would be worse than leaving the socket in blocking mode - at least in blocking mode we're woken up as soon as their an connection waiting to be accepted.</p>

<p>If we keep our listening socket in blocking mode, but put connected sockets in non-blocking mode, we'll face the same problems. When our <code>read</code> returns <code>error.WouldBlock</code> what can we realistically do? All of our implementations so far depend on one thread processing one connection. We need to fundamentally rethink our approach.</p>

<h3 id=polling><a href="#polling" aria-hidden=true>Polling</a></h3>
<p>We need to break out of the one-thread-per-connection pattern, which is something we can only do now that we've discovered non-blocking I/O. We probably need to start organizing our code a little better, but you can start thinking about having an array of sockets that we can loop through and try to read from. As an incomplete prototype, something like:</p>

{% highlight zig %}
for (sockets) |s| {
    const n = posix.read(s, &buf) catch |err| {
        switch (err) {
            error.WouldBlock => {},
            else => {
                posix.close(s);
                // TODO: remove the socket from our array
            },
        }
        // go to the next socket, this one isn't ready yet
        continue;
    }

    if (n == 0) {
        posix.close(s);
        // TODO: remove the socket from our array
        continue;
    }

    process(buf([0..n]))
}
{% endhighlight %}

<p>This is possible because <code>posix.read</code> won't block when no data is available. If <code>error.WouldBlock</code> is returned, we can skip to the next socket in our list. There are at least two major problems with this prototype. The first is that this will result in a tight loop when no socket is ready. We previously talked about the importance of minimizing system calls, but the above code would result in a massive number of calls to <code>read</code> as we constantly poll each socket, hoping that one has data.</p>

<p>A larger issue is the need to associate state with each connection. This was trivial in our thread-per-connection model, but gets more complicated now that we need to track multiple connections. For example, if we consider that a <code>read</code> is likely to return less than or more than a single "message" (<a href=/TCP-Server-In-Zig-Part-2-Message-Boundaries/>see Part 2</a>), it seems unlikely that we can share a buffer for all sockets. This is something that we started solving when we introduced a <code>Client</code>. Our above prototype can be improved if we think <code>clients</code> rather than <code>sockets</code>:</p>

{% highlight zig %}
for (clients) |client| {
    while (true) {
        const message = client.readMessage() catch |err| {
            client.close();
            // TODO: remove client from our clients array;
        } orelse break; // no message (either not enough bytes, or WouldBlock)

        client.process(message) catch {
            client.close();
            // TODO: remove client from our clients array;
        };
    }
}
{% endhighlight %}

<p>We're getting further and further away from working code, but the key change here is that a <code>Client</code> encapsulates the state necessary for reading and [eventually] writing messages. Our fictional <code>readMessage</code> could fail, but it could also return null to indicate that there isn't enough data to form a complete message yet - maybe because we don't have all the bytes, or maybe because <code>posix.read</code> returned <code>error.WouldBlock</code>. If you're wondering why we have an inner-loop, recall that we're not just concerned about reading less than a whole message, but also about reading more than a single message. It's possible that <code>readMessage</code>, which would eventually call <code>posix.read</code>, fills our buffer with multiple messages. We need to process them all.</p>

<p>So far, this is admittedly very abstract, but remember in <a href=/TCP-Server-In-Zig-Part-3-Minimizing-Writes-and-Reads/#overread>Part 3</a> where we create a stateful message-aware <code>Reader</code>. Our <code>client.readMessage</code> can be a wrapper around <code>reader.readMessage</code> which translates <code>error.WouldBlock</code> into <code>null</code>:</p>

{% highlight zig %}
fn readMessage(self: *Client) !?[]byte {
    return self.reader.readMessage() catch |err| switch (err) {
        error.WouldBlock => return null,
        else => return err,
    };
}
{% endhighlight %}

<p>While this seems promising, constantly looping through our clients, hoping that one or more has data, isn't going to be efficient.</p>

<h3 id=poll><a href="#poll" aria-hidden=true>poll</a></h3>
<p>The <code>poll</code> system call lets us register file descriptors (like sockets) with the operating system and be notified when certain events, like reading or writing, can be done without blocking. For example, we can give <code>poll</code> an array of sockets and it'll block until one is ready to be read. It's a simple API, a single function, but it's a change in how we think about serving clients. We're going to start by looking at a basic implementation which is unconcerned with things like message boundaries. Our goal, for now, is to get familiar with <code>poll</code>'s API and this new way of working with sockets.</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    // Our server can support 4095 clients. Wait, shouldn't that be 4096? No
    // One of the polling slots (the first one) is reserved for our listening
    // socket.

    var polls: [4096]posix.pollfd = undefined;
    polls[0] = .{
        .fd = listener,
        .events = posix.POLL.IN,
        .revents = 0,
    };
    var poll_count: usize = 1;

    while (true) {
        // polls is the total number of connections we can monitor, but
        // polls[0..poll_count + 1] is the actual number of clients + the listening
        // socket that are currently connected (remember, the upper bound is exclusive)
        var active = polls[0..poll_count + 1];

        // 2nd argument is the timeout, -1 is infinity
        _ = try posix.poll(active, -1);

        // Active[0] is _always_ the listening socket. When this socket is ready
        // we can accept. Putting it outside the following while loop means that
        // we don't have to check if if this is the listening socket on each
        // iteration
        if (active[0].revents != 0) {
            // The listening socket is ready, accept!
            // Notice that we pass SOCK.NONBLOCK to accept, placing the new client
            // socket in non-blocking mode. Also, for now, for simplicity,
            // we're not capturing the client address (the two null arguments).
            const socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);

            // Add this new client socket to our polls array for monitoring
            polls[poll_count] = .{
                .fd = socket,

                // This will be SET by posix.poll to tell us what event is ready
                // (or it will stay 0 if this socket isn't ready)
                .revents = 0,

                // We want to be notified about the POLL.IN event
                // (i.e. can read without blocking)
                .events = posix.POLL.IN,
            };

            // increment the number of active connections we're monitoring
            // this can overflow our 4096 polls array. TODO: fix that!
            poll_count += 1;
        }

        var i: usize = 1;
        while (i < active.len) {
            const polled = active[i];

            const revents = polled.revents;
            if (revents == 0) {
                // This socket isn't ready, go to the next one
                i += 1;
                continue;
            }

            var closed = false;

            // the socket is ready to be read
            if (revents & posix.POLL.IN == posix.POLL.IN) {
                var buf: [4096]u8 = undefined;
                const read = posix.read(polled.fd, &buf) catch 0;
                if (read == 0) {
                    // probably closed on the other side
                    closed = true;
                } else {
                    std.debug.print("[{d}] got: {any}\n", .{polled.fd, buf[0..read]});
                }
            }

            // either the read failed, or we're being notified through poll
            // that the socket is closed
            if (closed or (revents & posix.POLL.HUP == posix.POLL.HUP)) {
                posix.close(polled.fd);

                // We use a simple trick to remove it: we swap it with the last
                // item in our array, then "shrink" our array by 1
                const last_index = active.len - 1;
                active[i] = active[last_index];
                active = active[0..last_index];
                poll_count -= 1;

                // don't increment `i` because we swapped out the removed item
                // and shrank the array
            } else {
                // not closed, go to the next socket
                i += 1;
            }
        }
    }
}
{% endhighlight %}

<p>The above is a working example. You can run it and connect up to 4095 clients - any more and it'll crash, something we can and will eventually fix. The above code is documented, but there are a number of things to go over. The <code>posix.pollfd</code> structure has three fields:</p>

<ol>
    <li><code>fd</code> - The file descriptor that we're polling,
    <li><code>events</code> - A bitwise list of events we care about, for now that's only <code>POLL.IN</code> but later, when we look at writing, it'll be <code>POLL.IN | POLL.OUT</code>,
    <li><code>revents</code> - The ready events set by <code>poll</code> system call. This tells us which, if any, events are ready.
</ol>

<p>The first thing we do is setup a <code>pollfd</code> for our listening socket, registering our interest in <code>POLL.IN</code>. For a listening socket <code>POLL.IN</code> indicates that we can <code>accept</code> without blocking. We set and keep this at index zero throughout the lifetime of our program. When <code>posix.poll</code> returns, it means that at least 1 of the monitored file descriptor is ready. We iterate through them all, looking for any where <code>revents != 0</code>. We special-case our listening socket, always at <code>active[0]</code> - we need to call <code>accept</code> and process the new connection. As an optimization, we've moved that out of our loop. This means we don't have to check <code>if (i == 0)</code> for each iteration of our loop.</p>

<p>We could have used an <code>std.ArrayList(posix.pollfd)</code> to make our life of adding and removing entries a little easier, but, at least for the purpose of learning, I prefer the explicit code to handle new connections and disconnects.</p>

<p>You might be wondering why we check if <code>revents</code> has the <code>POLL.HUP</code> flag, even though we didn't register our interest in <code>POLL.HUP</code>. <code>POLL.HUP</code> is always monitored even if we don't explicitly ask for it. We check <code>POLL.IN</code> first, but we could check <code>POLL.HUP</code> first instead. For example, an HTTP server might prefer checking <code>POLL.HUP</code> and remove disconnected clients, ignoring any pending data they've sent. When <code>POLL.IN</code> <em>is</em> set, you might be wondering if <code>read</code> can fail and/or return zero bytes. The simple answer is: I'm not sure. It would certainly be possible for <code>read</code> to return <code>error.WouldBlock</code> if another thread called <code>read</code> on the same socket, draining it. And, we could get zero bytes if the supplied buffer passed into <code>read</code> was zero-length. In this single-threaded example, with a fixed-length buffer, neither of those cases is possible. But I think it's better to be safe than sorry, and we should always assume <code>read</code> can return zero bytes or an error.</p>

<h3 id=O-N><a href="#O-N" aria-hidden=true>O(N)</a></h3>
<p>The <code>poll</code> call unblocks when at least one of the monitored file descriptor is ready. But the only way to know which are ready is to iterate through all of them, checking if <code>revents != 0</code>. Even with a modest upper limit of 4K clients, this isn't ideal. There isn't much we can do about. However, <code>poll</code> <em>does</em> return the number of entries that have a non-zero <code>revents</code>. So while our code will still be O(N), we can at least stop iterating once we've processed the number of ready sockets</p>

{% highlight zig %}
while (true) {
    var active = polls[0..poll_count + 1];

    // we no longer ignore the return value
    var pending = try posix.poll(active, -1);

    if (active[0].revents != 0) {
        // previous code to accept the socket and add it to polls is unchanged
        // (but I removed it to keep this snippet small)

        // add this:
        pending -= 1;
    }

    // Next two lines are changed
    var i: usize = 1;
    while (pending > 0) {
        const polled = active[i];

        const revent = polled.revents;
        if (revent == 0) {
            i += 1;
            continue;
        }

        // add this
        // we've processed one of the pending notifications
        pending -= 1;

        // the rest is the same
        // ....
    }
}
{% endhighlight %}

<p>Both epoll and kqueue, platform-specific alternatives to poll, elegantly solve this problem. We'll cover both those APIs in following parts.</p>

<h3 id=level_triggered><a href="#level_triggered" aria-hidden=true>Level Triggered</a></h3>
<p>When the listening socket is ready to accept a new connection, that is when <code>polls[0].revents != 0</code>, we call <code>accept</code> once. But because our socket is in non-blocking mode, we could call it until we receive an <code>error.WouldBlock</code> error:</p>

{% highlight zig %}
if (active[0].revents != 0) {
    while (true) {
        const socket = posix.accept(listener, null, null, posix.SOCK.NONBLOCK) catch |err| switch (err) {
            error.WouldBlock => break,
            else => return err,
        };
        polls[poll_count] = .{
            .fd = socket,
            .revents = 0,
            .events = posix.POLL.IN,
        };
        poll_count += 1;
    }
}
{% endhighlight %}

<p><code>poll</code> tells us the listening socket can accept without blocking (i.e. that it's ready). But it doesn't tell us how many pending connections there are waiting to be accepted. Both approaches, looping and not looping, work because poll is always level-triggered. This means that <code>poll</code> continues to notify us so long as the socket is ready. This is different than edge-triggered which only notifies us when the state changes (we'll learn more about this in future parts).</p>

<p>Put differently, if there are four connections waiting to be accepted when <code>poll</code> returns, but we only accept one, the next call to <code>poll</code> will re-notify us that our listening socket is ready because of the still-pending three connections.</p>

<h3 id=stateful_reads><a href="#stateful_reads" aria-hidden=true>Stateful Reads</a></h3>
<p>In the above code, when a socket is ready, we simply print whatever bytes we were able to read. But as we've discussed a few times, TCP deals with streams of bytes and isn't "message"-aware. We need to add more state so that we can handle a <code>read</code> returning only part of a message or more than a single message. We already saw a pseudo-implementation of this above, when we looked at non-blocking I/O. To use that same approach, we have a minor problem: <code>poll</code> doesn't allow us to associate arbitrary data with the <code>pollfd</code>, so we need to create and manage this association ourselves.</p>

<p>I think it's time that we introduced a <code>Server</code> struct to help us keep our code tidy. First its fields, <code>init</code> and <code>deinit</code> functions. We'll see a full <code>Client</code> implementation shortly, but for now, it's just a thin wrapper around <code>Reader</code> from previous parts.</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;

// We're going to start logging errors to a scope logger
const log = std.log.scoped(.tcp_demo);

const Server = struct {
    // Our Client need an allocator to create their read buffer
    allocator: Allocator,

    // The number of clients we currently have connected
    connected: usize,

    // polls[0] is always our listening socket
    polls: []posix.pollfd,

    // list of clients, only client[0..connected] are valid
    clients: []Client,

    // This is always polls[1..] and it's used to so that we can manipulate
    // clients and client_polls together. Necessary because polls[0] is the
    // listening socket, and we don't ever touch that.
    client_polls: []posix.pollfd,

    fn init(allocator: Allocator, max: usize) !Server {
        // + 1 for the listening socket
        const polls = try allocator.alloc(posix.pollfd, max + 1);
        errdefer allocator.free(polls);

        const clients = try allocator.alloc(Client, max);
        errdefer allocator.free(clients);

        return .{
            .polls = polls,
            .clients = clients,
            .client_polls = polls[1..],
            .connected = 0,
            .allocator = allocator,
        };
    }

    fn deinit(self: *Server) void {
        // TODO: Close connected sockets? We'll talk about shutdowns in
        // a future part.
        self.allocator.free(self.polls);
        self.allocator.free(self.clients);
    }
};
{% endhighlight %}

<p>The first client that connects will obviously be at <code>clients[0]</code>, but it'll be at <code>polls[1]</code> because our listening socket always at <code>polls[0]</code>. This isn't a requirement, it's merely our own convention. But given that we want to poll both our listening socket and connected sockets, it's an efficient way to do things. However, it means that <code>client[N]</code> corresponds the <code>polls[N+1]</code>. This +1 offset is error prone and annoying, so we also create a <code>client_polls</code> which will be set to <code>polls[1..]</code>. Now when clients connected and disconnect, we just touch <code>clients</code> and <code>client_polls</code> which both share the same offset.</p>

<p>Next we can add a <code>run</code> function. This is almost the same code as our first example with <code>poll</code>, but I've extracted some of the functionality into their own function (which we'll see next):</p>

{% highlight zig %}
fn run(self: *Server, address: std.net.Address) !void {
    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    // One of the polling slots (the first one) is reserved for our listening
    // socket.
    self.polls[0] = .{
        .fd = listener,
        .revents = 0,
        .events = posix.POLL.IN,
    };

    while (true) {
        // Oops, still have a +1 here, since we want to poll all our connected
        // clients, plus our listening socket
        _ = try posix.poll(self.polls[0..self.connected + 1], -1);

        if (self.polls[0].revents != 0) {
            // listening socket is ready
            self.accept(listener) catch |err| log.err("failed to accept: {}", .{err});
        }

        var i: usize = 0;
        while (i < self.connected) {
            const revents = self.client_polls[i].revents;
            if (revents == 0) {
                // this socket isn't ready, move on to the next one
                i += 1;
                continue;
            }

            var client = &self.clients[i];
            if (revents & posix.POLL.IN == posix.POLL.IN) {
                // this socket is ready to be read
                while (true) {
                    const msg = client.readMessage() catch {
                        // we don't increment `i` when we remove the client
                        // because removeClient does a swap and puts the last
                        // client at position i
                        self.removeClient(i);
                        break;
                    } orelse {
                        // no more messages, but this client is still connected
                        i += 1;
                        break;
                    };
                    std.debug.print("got: {s}\n", .{msg});
                }
            }
        }
    }
}
{% endhighlight %}

<p>The goal here is that the client at <code>client[N]</code> is being monitored by <code>polls[N+1]</code>. But, to avoid that nasty +1, we use <code>client_polls</code> which is slice of <code>polls</code>: <code>polls[1..]</code>. This means that when <code>client_polls[N]</code> is ready, we can access the corresponding client at <code>clients[N]</code>.</p>

<p>I'm not sure I love the idea of extracting <code>accept</code> and <code>removeClient</code> into their own function. After all, they're only called from a single place and I like being able to read code without having to chase after it. But, ask me another day and I'll give you another answer:</p>

{% highlight zig %}
fn accept(self: *Server, listener: posix.socket_t) !void {
    while (true) {
        // we'll continue to accept until we get error.WouldBlock
        // or until our program crashes because we overflow self.clients and self.polls
        // (we really should fix that!)

        var address: net.Address = undefined;
        var address_len: posix.socklen_t = @sizeOf(net.Address);
        const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
            error.WouldBlock => return,
            else => return err,
        };

        const client = Client.init(self.allocator, socket, address) catch |err| {
            posix.close(socket);
            log.err("failed to initialize client: {}", .{err});
            return;
        };

        const connected = self.connected;
        self.clients[connected] = client;
        self.client_polls[connected] = .{
            .fd = socket,
            .revents = 0,
            .events = posix.POLL.IN,
        };
        self.connected = connected + 1;
    }
}

fn removeClient(self: *Server, at: usize) void {
    var client = self.clients[at];
    posix.close(client.socket);
    client.deinit(self.allocator);

    // Swap the client we're removing with the last one
    // So that when we set connected -= 1, it'll effectively "remove"
    // the client from our slices.
    const last_index = self.connected - 1;
    self.clients[at] = self.clients[last_index];
    self.client_polls[at] = self.client_polls[last_index];

    self.connected = last_index;
}
{% endhighlight %}

<p>It seems like a lot more code, but the approach is the same as our initial example that used <code>poll</code>. Rather than having two slices, <code>clients</code> and <code>client_polls</code> which are linked by index, we could have use a <code>std.AutoHashMap(posix.socket_t, Client)</code>. Then code would have looked something like:</p>

{% highlight zig %}
for (self.polls[1..]) |p| {
    const revents = p.revents;
    if (revents == 0) {
        // same as before
        continue;
    }
    const client = self.clients.getPtr(p.fd) orelse unreachable;
    // Like before, we now have a *Client for the socket which is ready
}
{% endhighlight %}

<p>Other parts of the code would also have been simplified - hashmaps have a way of doing that. But it isn't how I'd do it - because it would probably be less efficient - and this series isn't about taking the easy path.</p>

<h3 id=conclusion><a href="#conclusion" aria-hidden=true>Conclusion</a></h3>
<p>Although the <code>poll</code> system call is simple we had to make substantial changes to our code, and our mindset, to accommodate this different way of interacting with sockets. State becomes both more critical and challenging to maintain compared to previous approaches where we had a thread-per-connection.</p>

<p>The next part will continue to expand on the above, adding writes, timeouts, connection limits and looking at how we can combine what we've learnt here with our previous experience with multi-threading. Future parts will then look at platform-specific alternatives to <code>poll</code>: <code>epoll</code> for Linux and <code>kqueue</code> for BSD/MacOS. These are not only faster and more scalable than <code>poll</code> but also offer additional features. However, fundamentally, most of what we're learning here will translate directly to those APIs. Although it's relatively simple to build a system that supports both platform-specific APIs, <code>poll</code> has the significant benefit of being cross-platform. If you don't need the extra performance/scale or features of the platform-specific APIs, I suggest you stick with <code>poll</code>.</p>

<p>As a quick aside, there's also the <code>select</code> system call which is older and more limited than <code>poll</code>. Unless you're targeting a very old platform, you should always use <code>poll</code> instead of <code>select</code>. But you will see "select" mentioned/referenced now and again, so it's good to at least be aware of it.</p>

<div class=pager>
  <a class=prev href="/TCP-Server-In-Zig-Part-4-Multithreading/">Multithreading</a>
  <a class=next href=/TCP-Server-In-Zig-Part-5b-Poll/>Poll (Part 2)</a>
</div>


<h3 id=appendix-a><a href="#appendix-a" aria-hidden=true>Appendix A - Code</a></h3>
{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;

const log = std.log.scoped(.tcp_demo);

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    var server = try Server.init(allocator, 4096);
    defer server.deinit();

    const address = try std.net.Address.parseIp("127.0.0.1", 5882);
    try server.run(address);
}

const Server = struct {
    // creates our polls and clients slices and is passed to Client.init
    // for it to create our read buffer.
    allocator: Allocator,

    // The number of clients we currently have connected
    connected: usize,

    // polls[0] is always our listening socket
    polls: []posix.pollfd,


    // list of clients, only client[0..connected] are valid
    clients: []Client,

    // This is always polls[1..] and it's used to so that we can manipulate
    // clients and client_polls together. Necessary because polls[0] is the
    // listening socket, and we don't ever touch that.
    client_polls: []posix.pollfd,

    fn init(allocator: Allocator, max: usize) !Server {
        // + 1 for the listening socket
        const polls = try allocator.alloc(posix.pollfd, max + 1);
        errdefer allocator.free(polls);

        const clients = try allocator.alloc(Client, max);
        errdefer allocator.free(clients);

        return .{
            .polls = polls,
            .clients = clients,
            .client_polls = polls[1..],
            .connected = 0,
            .allocator = allocator,
        };
    }

    fn deinit(self: *Server) void {
        // TODO: Close connected sockets? We'll talk about shutdowns in
        // a future part.
        self.allocator.free(self.polls);
        self.allocator.free(self.clients);
    }

    fn run(self: *Server, address: std.net.Address) !void {
        const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
        const protocol = posix.IPPROTO.TCP;
        const listener = try posix.socket(address.any.family, tpe, protocol);
        defer posix.close(listener);

        try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
        try posix.bind(listener, &address.any, address.getOsSockLen());
        try posix.listen(listener, 128);

        // One of the polling slots (the first one) is reserved for our listening
        // socket.
        self.polls[0] = .{
            .fd = listener,
            .revents = 0,
            .events = posix.POLL.IN,
        };

        while (true) {
            // Oops, still have a +1 here, since we want to poll all our connected
            // clients, plus our listening socket
            _ = try posix.poll(self.polls[0..self.connected + 1], -1);

            if (self.polls[0].revents != 0) {
                // listening socket is ready
                self.accept(listener) catch |err| log.err("failed to accept: {}", .{err});
            }

            var i: usize = 0;
            while (i < self.connected) {
                const revents = self.client_polls[i].revents;
                if (revents == 0) {
                    // this socket isn't ready, move on to the next one
                    i += 1;
                    continue;
                }

                var client = &self.clients[i];
                if (revents & posix.POLL.IN == posix.POLL.IN) {
                    // this socket is ready to be read
                    while (true) {
                        const msg = client.readMessage() catch {
                            // we don't increment `i` when we remove the client
                            // because removeClient does a swap and puts the last
                            // client at position i
                            self.removeClient(i);
                            break;
                        } orelse {
                            // no more messages, but this client is still connected
                            i += 1;
                            break;
                        };

                        std.debug.print("got: {s}\n", .{msg});
                    }
                }
            }
        }
    }

    fn accept(self: *Server, listener: posix.socket_t) !void {
        while (true) {
            var address: net.Address = undefined;
            var address_len: posix.socklen_t = @sizeOf(net.Address);
            const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
                error.WouldBlock => return,
                else => return err,
            };

            const client = Client.init(self.allocator, socket, address) catch |err| {
                posix.close(socket);
                log.err("failed to initialize client: {}", .{err});
                return;
            };

            const connected = self.connected;
            self.clients[connected] = client;
            self.client_polls[connected] = .{
                .fd = socket,
                .revents = 0,
                .events = posix.POLL.IN,
            };
            self.connected = connected + 1;
        }
    }

    fn removeClient(self: *Server, at: usize) void {
        var client = self.clients[at];

        posix.close(client.socket);
        client.deinit(self.allocator);

        // Swap the client we're removing with the last one
        // So that when we set connected -= 1, it'll effectively "remove"
        // the client from our slices.
        const last_index = self.connected - 1;
        self.clients[at] = self.clients[last_index];
        self.client_polls[at] = self.client_polls[last_index];

        self.connected = last_index;
    }
};


const Client = struct {
    reader: Reader,
    socket: posix.socket_t,
    address: std.net.Address,

    fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address) !Client {
        const reader = try Reader.init(allocator, 4096);
        errdefer reader.deinit(allocator);

        return .{
            .reader = reader,
            .socket = socket,
            .address = address,
        };
    }

    fn deinit(self: *const Client, allocator: Allocator) void {
        self.reader.deinit(allocator);
    }

    fn readMessage(self: *Client) !?[]const u8 {
        return self.reader.readMessage(self.socket) catch |err| switch (err) {
            error.WouldBlock => return null,
            else => return err,
        };
    }
};

const Reader = struct {
    buf: []u8,
    pos: usize = 0,
    start: usize = 0,

    fn init(allocator: Allocator, size: usize) !Reader {
        const buf = try allocator.alloc(u8, size);
        return .{
            .pos = 0,
            .start = 0,
            .buf = buf,
        };
    }

    fn deinit(self: *const Reader, allocator: Allocator) void {
        allocator.free(self.buf);
    }

    fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
        var buf = self.buf;

        while (true) {
            if (try self.bufferedMessage()) |msg| {
                return msg;
            }
            const pos = self.pos;
            const n = try posix.read(socket, buf[pos..]);
            if (n == 0) {
                return error.Closed;
            }
            self.pos = pos + n;
        }
    }

    fn bufferedMessage(self: *Reader) !?[]u8 {
        const buf = self.buf;
        const pos = self.pos;
        const start = self.start;

        std.debug.assert(pos >= start);
        const unprocessed = buf[start..pos];
        if (unprocessed.len < 4) {
            self.ensureSpace(4 - unprocessed.len) catch unreachable;
            return null;
        }

        const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);

        // the length of our message + the length of our prefix
        const total_len = message_len + 4;

        if (unprocessed.len < total_len) {
            try self.ensureSpace(total_len);
            return null;
        }

        self.start += total_len;
        return unprocessed[4..total_len];
    }

    fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
        const buf = self.buf;
        if (buf.len < space) {
            return error.BufferTooSmall;
        }

        const start = self.start;
        const spare = buf.len - start;
        if (spare >= space) {
            return;
        }

        const unprocessed = buf[start..self.pos];
        std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
        self.start = 0;
        self.pos = unprocessed.len;
    }
};
{% endhighlight %}

<div class=pager>
  <a class=prev href="/TCP-Server-In-Zig-Part-4-Multithreading/">Multithreading</a>
  <a class=next href=/TCP-Server-In-Zig-Part-5b-Poll/>Poll (Part 2)</a>
</div>
